{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Spark streaming with trace DSL"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "This notebook uses spark streaming and trace DSL to extract features from trace data\n",
    "and publish metrics to Prometheus."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%loadFromPOM\n",
    "    <dependency>\n",
    "      <groupId>org.apache.spark</groupId>\n",
    "      <artifactId>spark-core_2.12</artifactId>\n",
    "      <version>2.4.4</version>\n",
    "    </dependency>\n",
    "    <dependency>\n",
    "      <groupId>org.apache.spark</groupId>\n",
    "      <artifactId>spark-streaming_2.12</artifactId>\n",
    "      <version>2.4.4</version>\n",
    "    </dependency>\n",
    "    <dependency>\n",
    "      <groupId>org.apache.spark</groupId>\n",
    "      <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>\n",
    "      <version>2.4.4</version>\n",
    "    </dependency>\n",
    "    <dependency>\n",
    "      <groupId>org.apache.kafka</groupId>\n",
    "      <artifactId>kafka_2.12</artifactId>\n",
    "      <version>2.3.0</version>\n",
    "    </dependency>    \n",
    "<dependency>\n",
    "    <groupId>io.prometheus</groupId>\n",
    "    <artifactId>simpleclient_httpserver</artifactId>\n",
    "    <version>0.7.0</version>\n",
    "</dependency>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Install library to the local maven repository\n",
    "This step is only needed if trace DSL source code has been modified.\n",
    "Open terminal in Jupyter and run the following command:\n",
    "```\n",
    "cd work && ./mvnw clean install -DskipTests\n",
    "```"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "class Keys\n",
      "class org.apache.spark.SparkConf\n"
     ]
    }
   ],
   "source": [
    "%maven io.jaegertracing:jaeger-tracedsl:999-SNAPSHOT\n",
    "\n",
    "System.out.println(Keys.class);\n",
    "System.out.println(org.apache.spark.SparkConf.class);"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Define connection to Kafka"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "String kafkaServers = \"192.168.42.6:32632\";\n",
    "String kafkaTopic = \"jaeger-spans\";\n",
    "int prometheusPort = 9001;"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "19/11/05 16:40:18 INFO SparkContext: Running Spark version 2.4.4\n",
      "19/11/05 16:40:18 INFO SparkContext: Submitted application: Trace DSL\n",
      "19/11/05 16:40:18 INFO SecurityManager: Changing view acls to: jovyan\n",
      "19/11/05 16:40:18 INFO SecurityManager: Changing modify acls to: jovyan\n",
      "19/11/05 16:40:18 INFO SecurityManager: Changing view acls groups to: \n",
      "19/11/05 16:40:18 INFO SecurityManager: Changing modify acls groups to: \n",
      "19/11/05 16:40:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(jovyan); groups with view permissions: Set(); users  with modify permissions: Set(jovyan); groups with modify permissions: Set()\n",
      "19/11/05 16:40:18 INFO Utils: Successfully started service 'sparkDriver' on port 39681.\n",
      "19/11/05 16:40:18 INFO SparkEnv: Registering MapOutputTracker\n",
      "19/11/05 16:40:18 INFO SparkEnv: Registering BlockManagerMaster\n",
      "19/11/05 16:40:18 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information\n",
      "19/11/05 16:40:18 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up\n",
      "19/11/05 16:40:18 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-426a3dd1-2bd7-4781-9181-32a36d79a012\n",
      "19/11/05 16:40:18 INFO MemoryStore: MemoryStore started with capacity 6.8 GB\n",
      "19/11/05 16:40:18 INFO SparkEnv: Registering OutputCommitCoordinator\n",
      "19/11/05 16:40:18 INFO Utils: Successfully started service 'SparkUI' on port 4040.\n",
      "19/11/05 16:40:18 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://c8bb256338ca:4040\n",
      "19/11/05 16:40:18 INFO Executor: Starting executor ID driver on host localhost\n",
      "19/11/05 16:40:18 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33661.\n",
      "19/11/05 16:40:18 INFO NettyBlockTransferService: Server created on c8bb256338ca:33661\n",
      "19/11/05 16:40:18 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy\n",
      "19/11/05 16:40:18 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, c8bb256338ca, 33661, None)\n",
      "19/11/05 16:40:18 INFO BlockManagerMasterEndpoint: Registering block manager c8bb256338ca:33661 with 6.8 GB RAM, BlockManagerId(driver, c8bb256338ca, 33661, None)\n",
      "19/11/05 16:40:18 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, c8bb256338ca, 33661, None)\n",
      "19/11/05 16:40:18 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, c8bb256338ca, 33661, None)\n",
      "19/11/05 16:40:18 WARN KafkaUtils: overriding enable.auto.commit to false for executor\n",
      "19/11/05 16:40:18 WARN KafkaUtils: overriding auto.offset.reset to none for executor\n",
      "19/11/05 16:40:18 WARN KafkaUtils: overriding executor group.id to spark-executor-trace-aggregation-1572972018561\n",
      "19/11/05 16:40:18 WARN KafkaUtils: overriding receive.buffer.bytes to 65536 see KAFKA-3135\n",
      "19/11/05 16:40:18 INFO DirectKafkaInputDStream: Slide time = 5000 ms\n",
      "19/11/05 16:40:18 INFO DirectKafkaInputDStream: Storage level = Serialized 1x Replicated\n",
      "19/11/05 16:40:18 INFO DirectKafkaInputDStream: Checkpoint interval = null\n",
      "19/11/05 16:40:18 INFO DirectKafkaInputDStream: Remember interval = 5000 ms\n",
      "19/11/05 16:40:18 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka010.DirectKafkaInputDStream@4fb7b43e\n",
      "19/11/05 16:40:18 INFO MappedDStream: Slide time = 5000 ms\n",
      "19/11/05 16:40:18 INFO MappedDStream: Storage level = Serialized 1x Replicated\n",
      "19/11/05 16:40:18 INFO MappedDStream: Checkpoint interval = null\n",
      "19/11/05 16:40:18 INFO MappedDStream: Remember interval = 5000 ms\n",
      "19/11/05 16:40:18 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@2593c304\n",
      "19/11/05 16:40:18 INFO ShuffledDStream: Slide time = 5000 ms\n",
      "19/11/05 16:40:18 INFO ShuffledDStream: Storage level = Serialized 1x Replicated\n",
      "19/11/05 16:40:18 INFO ShuffledDStream: Checkpoint interval = null\n",
      "19/11/05 16:40:18 INFO ShuffledDStream: Remember interval = 5000 ms\n",
      "19/11/05 16:40:18 INFO ShuffledDStream: Initialized and validated org.apache.spark.streaming.dstream.ShuffledDStream@7eb23b25\n",
      "19/11/05 16:40:18 INFO MapValuedDStream: Slide time = 5000 ms\n",
      "19/11/05 16:40:18 INFO MapValuedDStream: Storage level = Serialized 1x Replicated\n",
      "19/11/05 16:40:18 INFO MapValuedDStream: Checkpoint interval = null\n",
      "19/11/05 16:40:18 INFO MapValuedDStream: Remember interval = 5000 ms\n",
      "19/11/05 16:40:18 INFO MapValuedDStream: Initialized and validated org.apache.spark.streaming.dstream.MapValuedDStream@5d7c12a4\n",
      "19/11/05 16:40:18 INFO MappedDStream: Slide time = 5000 ms\n",
      "19/11/05 16:40:18 INFO MappedDStream: Storage level = Serialized 1x Replicated\n",
      "19/11/05 16:40:18 INFO MappedDStream: Checkpoint interval = null\n",
      "19/11/05 16:40:18 INFO MappedDStream: Remember interval = 5000 ms\n",
      "19/11/05 16:40:18 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@2f984b71\n",
      "19/11/05 16:40:18 INFO ForEachDStream: Slide time = 5000 ms\n",
      "19/11/05 16:40:18 INFO ForEachDStream: Storage level = Serialized 1x Replicated\n",
      "19/11/05 16:40:18 INFO ForEachDStream: Checkpoint interval = null\n",
      "19/11/05 16:40:18 INFO ForEachDStream: Remember interval = 5000 ms\n",
      "19/11/05 16:40:18 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@14283665\n",
      "19/11/05 16:40:18 INFO ConsumerConfig: ConsumerConfig values: \n",
      "\tallow.auto.create.topics = true\n",
      "\tauto.commit.interval.ms = 5000\n",
      "\tauto.offset.reset = earliest\n",
      "\tbootstrap.servers = [192.168.42.6:32632]\n",
      "\tcheck.crcs = true\n",
      "\tclient.dns.lookup = default\n",
      "\tclient.id = \n",
      "\tclient.rack = \n",
      "\tconnections.max.idle.ms = 540000\n",
      "\tdefault.api.timeout.ms = 60000\n",
      "\tenable.auto.commit = false\n",
      "\texclude.internal.topics = true\n",
      "\tfetch.max.bytes = 52428800\n",
      "\tfetch.max.wait.ms = 500\n",
      "\tfetch.min.bytes = 1\n",
      "\tgroup.id = trace-aggregation-1572972018561\n",
      "\tgroup.instance.id = null\n",
      "\theartbeat.interval.ms = 3000\n",
      "\tinterceptor.classes = []\n",
      "\tinternal.leave.group.on.close = true\n",
      "\tisolation.level = read_uncommitted\n",
      "\tkey.deserializer = class org.apache.kafka.common.serialization.StringDeserializer\n",
      "\tmax.partition.fetch.bytes = 1048576\n",
      "\tmax.poll.interval.ms = 300000\n",
      "\tmax.poll.records = 500\n",
      "\tmetadata.max.age.ms = 300000\n",
      "\tmetric.reporters = []\n",
      "\tmetrics.num.samples = 2\n",
      "\tmetrics.recording.level = INFO\n",
      "\tmetrics.sample.window.ms = 30000\n",
      "\tpartition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]\n",
      "\treceive.buffer.bytes = 65536\n",
      "\treconnect.backoff.max.ms = 1000\n",
      "\treconnect.backoff.ms = 50\n",
      "\trequest.timeout.ms = 30000\n",
      "\tretry.backoff.ms = 100\n",
      "\tsasl.client.callback.handler.class = null\n",
      "\tsasl.jaas.config = null\n",
      "\tsasl.kerberos.kinit.cmd = /usr/bin/kinit\n",
      "\tsasl.kerberos.min.time.before.relogin = 60000\n",
      "\tsasl.kerberos.service.name = null\n",
      "\tsasl.kerberos.ticket.renew.jitter = 0.05\n",
      "\tsasl.kerberos.ticket.renew.window.factor = 0.8\n",
      "\tsasl.login.callback.handler.class = null\n",
      "\tsasl.login.class = null\n",
      "\tsasl.login.refresh.buffer.seconds = 300\n",
      "\tsasl.login.refresh.min.period.seconds = 60\n",
      "\tsasl.login.refresh.window.factor = 0.8\n",
      "\tsasl.login.refresh.window.jitter = 0.05\n",
      "\tsasl.mechanism = GSSAPI\n",
      "\tsecurity.protocol = PLAINTEXT\n",
      "\tsend.buffer.bytes = 131072\n",
      "\tsession.timeout.ms = 10000\n",
      "\tssl.cipher.suites = null\n",
      "\tssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]\n",
      "\tssl.endpoint.identification.algorithm = https\n",
      "\tssl.key.password = null\n",
      "\tssl.keymanager.algorithm = SunX509\n",
      "\tssl.keystore.location = null\n",
      "\tssl.keystore.password = null\n",
      "\tssl.keystore.type = JKS\n",
      "\tssl.protocol = TLS\n",
      "\tssl.provider = null\n",
      "\tssl.secure.random.implementation = null\n",
      "\tssl.trustmanager.algorithm = PKIX\n",
      "\tssl.truststore.location = null\n",
      "\tssl.truststore.password = null\n",
      "\tssl.truststore.type = JKS\n",
      "\tvalue.deserializer = class ProtoSpanDeserializer\n",
      "\n",
      "19/11/05 16:40:18 WARN ConsumerConfig: The configuration 'startingOffsets' was supplied but isn't a known config.\n",
      "19/11/05 16:40:18 WARN ConsumerConfig: The configuration 'endingOffsets' was supplied but isn't a known config.\n",
      "19/11/05 16:40:18 INFO AppInfoParser: Kafka version: 2.3.0\n",
      "19/11/05 16:40:18 INFO AppInfoParser: Kafka commitId: fc1aaa116b661c8a\n",
      "19/11/05 16:40:18 INFO AppInfoParser: Kafka startTimeMs: 1572972018814\n",
      "19/11/05 16:40:18 INFO KafkaConsumer: [Consumer clientId=consumer-5, groupId=trace-aggregation-1572972018561] Subscribed to topic(s): jaeger-spans\n"
     ]
    },
    {
     "ename": "EvaluationInterruptedException",
     "evalue": "Evaluator was interrupted while executing: 'ssc.start();'",
     "output_type": "error",
     "traceback": [
      "\u001b[1m\u001b[30m|   \u001b[1m\u001b[30mssc.start();\u001b[0m",
      "\u001b[1m\u001b[31mEvaluation interrupted.\u001b[0m"
     ]
    }
   ],
   "source": [
    "import io.prometheus.client.exporter.HTTPServer;\n",
    "import java.io.IOException;\n",
    "import java.util.Collections;\n",
    "import java.util.HashMap;\n",
    "import java.util.Map;\n",
    "import java.util.Set;\n",
    "import java.util.stream.Collectors;\n",
    "import java.util.stream.StreamSupport;\n",
    "import org.apache.kafka.clients.consumer.ConsumerRecord;\n",
    "import org.apache.kafka.common.serialization.StringDeserializer;\n",
    "import org.apache.spark.SparkConf;\n",
    "import org.apache.spark.api.java.JavaSparkContext;\n",
    "import org.apache.spark.streaming.Duration;\n",
    "import org.apache.spark.streaming.api.java.JavaDStream;\n",
    "import org.apache.spark.streaming.api.java.JavaInputDStream;\n",
    "import org.apache.spark.streaming.api.java.JavaPairDStream;\n",
    "import org.apache.spark.streaming.api.java.JavaStreamingContext;\n",
    "import org.apache.spark.streaming.kafka010.ConsumerStrategies;\n",
    "import org.apache.spark.streaming.kafka010.KafkaUtils;\n",
    "import org.apache.spark.streaming.kafka010.LocationStrategies;\n",
    "import org.apache.tinkerpop.gremlin.structure.Graph;\n",
    "import scala.Tuple2;\n",
    "import io.jaegertracing.analytics.gremlin.*;\n",
    "import io.jaegertracing.analytics.query.*;\n",
    "import io.jaegertracing.analytics.model.*;\n",
    "\n",
    "HTTPServer server = new HTTPServer(prometheusPort);\n",
    "\n",
    "SparkConf sparkConf = new SparkConf().setAppName(\"Trace DSL\").setMaster(\"local[*]\");\n",
    "JavaSparkContext sc = new JavaSparkContext(sparkConf);\n",
    "JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(5000));\n",
    "\n",
    "Set<String> topics = Collections.singleton(kafkaTopic);\n",
    "Map<String, Object> kafkaParams = new HashMap<>();\n",
    "kafkaParams.put(\"bootstrap.servers\", kafkaServers);\n",
    "kafkaParams.put(\"key.deserializer\", StringDeserializer.class);\n",
    "kafkaParams. put(\"value.deserializer\", ProtoSpanDeserializer.class);\n",
    "// hack to start always from beginning\n",
    "kafkaParams.put(\"group.id\", \"trace-aggregation-\" + System.currentTimeMillis());\n",
    "kafkaParams.put(\"auto.offset.reset\", \"earliest\");\n",
    "kafkaParams.put(\"enable.auto.commit\", false);\n",
    "kafkaParams.put(\"startingOffsets\", \"earliest\");\n",
    "kafkaParams.put(\"endingOffsets\", \"latest\");\n",
    "\n",
    "JavaInputDStream<ConsumerRecord<String, Span>> messages =\n",
    "    KafkaUtils.createDirectStream(\n",
    "        ssc,\n",
    "        LocationStrategies.PreferConsistent(),\n",
    "        ConsumerStrategies.Subscribe(topics, kafkaParams));\n",
    "\n",
    "JavaPairDStream<String, Span> traceIdSpanTuple = messages.mapToPair(record -> {\n",
    "  return new Tuple2<>(record.value().traceId, record.value());\n",
    "});\n",
    "\n",
    "JavaDStream<Trace> tracesStream = traceIdSpanTuple.groupByKey().map(traceIdSpans -> {\n",
    "  Iterable<Span> spans = traceIdSpans._2();\n",
    "  Trace trace = new Trace();\n",
    "  trace.traceId = traceIdSpans._1();\n",
    "  trace.spans = StreamSupport.stream(spans.spliterator(), false)\n",
    "      .collect(Collectors.toList());\n",
    "  return trace;\n",
    "});\n",
    "\n",
    "tracesStream.foreachRDD((traceRDD, time) -> {\n",
    "  traceRDD.foreach(trace -> {\n",
    "    Graph graph = GraphCreator.create(trace);\n",
    "    TraceDepth.calculate(graph);\n",
    "  });\n",
    "});\n",
    "\n",
    "ssc.start();\n",
    "ssc.awaitTermination();"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Stop"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {
    "scrolled": true
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "19/11/05 16:40:12 WARN StreamingContext: StreamingContext has not been started yet\n",
      "19/11/05 16:40:12 INFO SparkUI: Stopped Spark web UI at http://c8bb256338ca:4040\n",
      "19/11/05 16:40:12 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!\n",
      "19/11/05 16:40:12 INFO MemoryStore: MemoryStore cleared\n",
      "19/11/05 16:40:12 INFO BlockManager: BlockManager stopped\n",
      "19/11/05 16:40:12 INFO BlockManagerMaster: BlockManagerMaster stopped\n",
      "19/11/05 16:40:12 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!\n",
      "19/11/05 16:40:12 INFO SparkContext: Successfully stopped SparkContext\n",
      "19/11/05 16:40:12 WARN StreamingContext: StreamingContext has already been stopped\n",
      "19/11/05 16:40:12 INFO SparkContext: SparkContext already stopped.\n"
     ]
    }
   ],
   "source": [
    "if (server != null) server.stop();\n",
    "if (ssc != null) { ssc.stop(); ssc.close();}\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Get Prometheus metrics\n",
    "\n",
    "Open browser on the host running this notebook to see exported Prometheus metrics e.g. http://localhost:9001. \n",
    "Or configure Prometheus to scarep metrics from the host where this notebook is running."
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Java",
   "language": "java",
   "name": "java"
  },
  "language_info": {
   "codemirror_mode": "java",
   "file_extension": ".jshell",
   "mimetype": "text/x-java-source",
   "name": "Java",
   "pygments_lexer": "java",
   "version": "11.0.5+10"
  },
  "widgets": {
   "application/vnd.jupyter.widget-state+json": {
    "state": {},
    "version_major": 2,
    "version_minor": 0
   }
  },
  "pycharm": {
   "stem_cell": {
    "cell_type": "raw",
    "source": [],
    "metadata": {
     "collapsed": false
    }
   }
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
